dbt
. I have an Airflow
. Ugh Astronomer Cosmos
~圖片來源:前幾天剛發布的 Cosmos 1.6
其實不能說是 dbt + Airflow = Cosmos,而是 dbt + Astronomer Cosmos = Airflow DAG 才對,在還沒有 Cosmos 之前,dbt 和 Airflow 基本上是兩個沒什麼相關的工具,使用場景也不太一樣,dbt 只負責資料轉換的部分,但 airflow 通常就是整條 data pipeline 的核心,過去如果要在 airflow 當中使用 dbt,通常就是用 BashOperator,這樣會導致 dbt 變成一個黑盒子,開發和維護上都很痛苦。
後來~我總算學會了~如何去...用 Astronomer Cosmos
,就能將 airflow 排程、除錯、監控等等優點都應用上去啦!
其實除了 Astronomer Cosmos
,也有其他工具可以將 dbt 結合 Airflow,像是這個 airflow-dbt 和 dbt-airflow,結果測試完發現不是沒在維護就是不夠成熟,結果還是主要負責維護 Airflow 的公司 Astronomer
感覺最保險可靠啦~去年介紹 Airflow 的時候也有提到 Astronomer
,可以回去看看Apache Airflow 的前世今生!
BashOperator
from airflow.operators.bash_operator import BashOperator
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='dbt run',
dag=dag
)
過去的方法像是上方的範例,dbt_run
會將整個 dbt 全部執行,中間如果有錯,就需要整個重新跑,需要 dbt test
又需要新增一個 BashOperator
,如果想將每個 model 層級或是單一 model 拆開,就需要自行寫 dbt run --models {model_folder}
或 dbt run --select {model.sql}
,開發和維護上非常麻煩。
最早討論是 2020 年到 2021 年間, Astronomer 和 Updater 共同發佈了三篇關於 dbt 和 airflow 的文章(下方參考資料),後續經過無數驗證和修改,最終 Astronomer 於 2022 年 12 月 14 日發布了 Cosmos 的首個版本 (1.0.0),核心概念是透過分析 dbt 當中 manifest.json
檔案,以「Airflow 原生」的方式將 dbt 專案整合到 Airflow 中,只需運用DbtTaskGroup
和 DbtDAG
就能將 dbt 當中的 model 轉換成 Airflow 當中的 TaskGroup 和 DAG。
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
jaffle_shop = DbtTaskGroup(
project_config=ProjectConfig("/path/to/jaffle_shop"),
profile_config=ProfileConfig(
profile_name="my_profile",
target_name="my_target",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="my_postgres_dbt",
profile_args={"schema": "public"},
),
)
)
如同下方的圖顯示,使用 Astronomer Cosmos
之後會發現 dbt 在 Airflow 當中的執行不再是黑盒子了,在 TaskGroup
當中可以查看所有 上下游的關係,即便有任一執行出錯,可能單獨重新執行,過去每次執行可能要花 1 小時,有錯誤又要再一小時的地獄,終於逃脫了!
圖片來源:cosmos blog
明天就會從環境架設、結構說明,帶著大家一一認識當中的檔案和連接方式,GO~GO~